Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Use the worker-id gRPC metadata #33438

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

damondouglas
Copy link
Contributor

This PR closes #32167 via implementation of a worker.MultiplexW to forward FnAPI gRPC requests to *W stored by id that matches the worker-id gRPC context metadata. In addition to typical go test workflow, to validate this PR ./gradlew :runners:portability:java:ulrLoopbackValidatesRunnerTests -PjobEndpoint=localhost:8073 was ran on a few initial tests. The idle_shutdown_timeout was tested by visual inspection to validate the additional service does not block the executable from shutting down.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@damondouglas
Copy link
Contributor Author

damondouglas commented Dec 24, 2024

At 3636a3c, Python sends an empty string for worker_id key in the gRPC metadata. Debugging the workerFromMetadataCtx method, I see a worker_id sent over when running python apache_beam/examples/wordcount.py, except for a single instance.

The error worker id in ctx metadata is an empty string is specific to id == "". This is a conditional after grpcx.ReadWorkerID successfully passes all checks for available metadata.FromIncomingContext, and the metadata.MD has the worker_id key, and finally that the length of the metadata.MD[worker_id] is 1.

worker id in ctx metadata is an empty string

``` grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "worker id in ctx metadata is an empty string" debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, created_time:"2024-12-23T16:08:00.205761402-08:00"}" >

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 386, in
lambda: self.create_worker().do_instruction(request), request)
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 657, in do_instruction
return getattr(self, request_type)(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1274, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 567, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1159, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 1163, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1159, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 1163, in apache_beam.runners.worker.operations.CombineOperation.process
File "apache_beam/runners/worker/operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 260, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1591, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 689, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1686, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1799, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1591, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 917, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 1059, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1686, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1799, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1503, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1612, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 917, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 1000, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 499, in getitem
self._cache[target_window] = self._side_input_data.view_fn(raw_view)
File "sdks/python/apache_beam/pvalue.py", line 389, in
lambda iterable: from_runtime_iterable(iterable, view_options))
File "sdks/python/apache_beam/pvalue.py", line 509, in _from_runtime_iterable
head = list(itertools.islice(it, 2))
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1289, in _lazy_iterator
input_stream, continuation_token = self._get_raw(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1307, in _get_raw
self._underlying.get_raw(state_key, continuation_token))
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1094, in get_raw
response = self._blocking_request(
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1134, in _blocking_request
raise self._exception
File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1069, in pull_responses
for response in responses:
File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", line 543, in next
return self._next()
File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
raise self
RuntimeError: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "worker id in ctx metadata is an empty string"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, created_time:"2024-12-23T16:08:00.205761402-08:00"}"

[while running 'Write/Write/WriteImpl/WriteBundles']


</details>

</summary>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[prism] Use the worker-id GRPC metadata
1 participant